Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-42896][SQL][PYTHON] Make mapInPandas / mapInArrow support barrier mode execution #40520

Closed
wants to merge 8 commits into from

Conversation

WeichenXu123
Copy link
Contributor

@WeichenXu123 WeichenXu123 commented Mar 22, 2023

What changes were proposed in this pull request?

Make mapInPandas / mapInArrow support barrier mode execution

Why are the changes needed?

This is the preparation PR for supporting mapInPandas / mapInArrow barrier execution in spark connect mode. The feature is required by machine learning use cases.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
@WeichenXu123 WeichenXu123 changed the title [SPARK-42896][SQL][PYSPARK] Make mapInPandas / mapInArrow` support barrier mode execution [SPARK-42896][SQL][PYSPARK] Make mapInPandas / mapInArrow support barrier mode execution Mar 22, 2023
Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
@@ -28,7 +28,8 @@ import org.apache.spark.sql.execution.SparkPlan
case class MapInPandasExec(
func: Expression,
output: Seq[Attribute],
child: SparkPlan)
child: SparkPlan,
override val isBarrier: Boolean)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would require an implementation much complicated than this actually. The current implementation might work for now because these specific physical plans don't move around much for now but the implementation is flaky because the physical plans can change via Catalyst Optimizer (e.g., predicate pushdown) but the barrier execution mode requires that the barrier is created exactly the location where it's invoked.

Copy link
Contributor Author

@WeichenXu123 WeichenXu123 Mar 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change via Catalyst Optimizer (e.g., predicate pushdown)

Do you mean pushdown like this ?

Filter
  MapInPandas
      XXOp
          ....

to

MapInPandas
  Filter
      XXOp
          ....

?

I don't think we should allow pushdown in this case. MapInPandas executes arbitrary user code. Can we modify optimizer code to prevent it change the plan when it finds "is_barrier" set ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The feature mapInPandas / mapInArrow supporting barrier mode is only for ML use case, we don't need to complicate it. We only need to support such scenario:

input_dataset.mapInPandas(..., is_barrier=True).collect()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant that the optimizers can might switch the physical plan order in the future, and that's why the implementation is flaky. Now this works because we don't switch the order anywhere else.

Can we modify optimizer code to prevent it change the plan when it finds "is_barrier" set ?

To have a complete implementation, we should have something like #19873 which brought a lot of side effect, and it got reverted in the end.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is only for internal purpose, let's probably don't expose is_barrier: bool = False to the API surface at least.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To simply the implementation, we can implement a barrierMapInPandasAndCollect instead, and define a execution plan stage like BarrierMapInPandasAndCollectExec

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is only for internal purpose, let's probably don't expose is_barrier: bool = False to the API surface at least.

No. It should be public API. Third-party lib such as xgboost also need to use it.

@WeichenXu123
Copy link
Contributor Author

WeichenXu123 commented Mar 23, 2023

To address @HyukjinKwon 's concern about optimizer,

can we add is_barrier attribute into UnaryNode,
and if optimizer find a node marking is_barrier as True, then skip all optimizations around the node.

CC @cloud-fan @mengxr WDYT ?

Barrier mode is only used in specific ML case, i.e. in model training routine, we will only use it in one pattern:

dataset.mapInPandas(..., is_barrier=True).collect()

and we don't need complex optimization for it.

@cloud-fan
Copy link
Contributor

cloud-fan commented Mar 23, 2023

hmmm why do we need to care about the optimizer? The optimizer is not sensitive to the physical execution engine, e.g. Presto, Spark, Flink have many similar SQL optimizations.

@WeichenXu123
Copy link
Contributor Author

WeichenXu123 commented Mar 23, 2023

hmmm why do we need to care about the optimizer? The optimizer is not sensitive to the physical execution engine, e.g. Preso, Spark, Flink have many similar SQL optimizations.

I am not familier with optimizer details but it is concern from @HyukjinKwon
But note this PR also changes logical plan operator like MapInPandas

@HyukjinKwon
Copy link
Member

Predicate pushdown is just an example. e.g., you might want to combine adjacent MapInPandass but it would need a special handling if is_barrier flag is added.

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Mar 23, 2023

I am saying that real power of Catalyst optimizer is to optimize/reorder these logical plans, and I believe that's the reason why barrier execution wasn't introduced in SQL. The barrier has to be created exactly when the call is invoked, in which basically it requires something like #19873 to have a sound implementation.

However, I am fine with having this as an exception if you guys are fine with this.

@zhengruifeng
Copy link
Contributor

zhengruifeng commented Mar 23, 2023

Barrier mode is only used in specific ML case, i.e. in model training routine, we will only use it in one pattern:

dataset.mapInPandas(..., is_barrier=True).collect()

To simply the implementation, we can implement a barrierMapInPandasAndCollect instead, and define a execution plan stage like BarrierMapInPandasAndCollectExec

If it is the only usage case, i think it will be safe to add dedicated logical plan and physical plan for it.

@WeichenXu123
Copy link
Contributor Author

I am saying that real power of Catalyst optimizer is to optimize/reorder these logical plans, and I believe that's the reason why barrier execution wasn't introduced in SQL. The barrier has to be created exactly when the call is invoked, in which basically it requires something like #19873 to have a sound implementation.

However, I am fine with having this as an exception if you guys are fine with this.

@cloud-fan What do you think of this ?

@cloud-fan
Copy link
Contributor

From a SQL engine's point of view, running all tasks at once or batch by batch doesn't matter. It doesn't change the semantics of the SQL operator, and the optimizer doesn't care about it. However, mapInPandas is a public API and you are free to define what's the expectation of the is_barrier parameter. To make our life easier, we can just define it as "the tasks of running the pandas function must all be launched at once", and it's not a barrier to the SQL operators. Then I think it's fine to just add a flag to the existing MapInPandas operator.

Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
@HyukjinKwon HyukjinKwon changed the title [SPARK-42896][SQL][PYSPARK] Make mapInPandas / mapInArrow support barrier mode execution [SPARK-42896][SQL][PYTHON] Make mapInPandas / mapInArrow support barrier mode execution Mar 24, 2023
Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
@WeichenXu123 WeichenXu123 marked this pull request as ready for review March 24, 2023 10:20
Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok lgtm

@WeichenXu123
Copy link
Contributor Author

merged to master :)

@@ -60,6 +60,7 @@ def mapInPandas(
schema : :class:`pyspark.sql.types.DataType` or str
the return type of the `func` in PySpark. The value can be either a
:class:`pyspark.sql.types.DataType` object or a DDL-formatted type string.
isBarrier : Use barrier mode execution if True.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should mark these new arguments by .. versionadded?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we should. with the directive .. versionchanged.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let me just make a quick followup.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WeichenXu123 pushed a commit that referenced this pull request Feb 19, 2024
…urceProfile

### What changes were proposed in this pull request?

Support stage-level scheduling for some PySpark DataFrame APIs (mapInPandas and mapInArrow).

### Why are the changes needed?

The introduction of barrier mode in Spark, as seen in #40520, allows for the implementation of Spark ML cases (pure Python algorithms) using DataFrame APIs such as mapInPandas and mapInArrow, so it's necessary to enable stage-level scheduling for DataFrame APIs.

### Does this PR introduce _any_ user-facing change?
Yes, This PR adds a new argument "profile" for mapInPandas and mapInArrow.

``` python
def mapInPandas(
    self, func: "PandasMapIterFunction",
        schema: Union[StructType, str],
        barrier: bool = False,
        profile: Optional[ResourceProfile] = None,
) -> "DataFrame":

def mapInArrow(
    self, func: "ArrowMapIterFunction",
        schema: Union[StructType, str],
        barrier: bool = False,
        profile: Optional[ResourceProfile] = None,
) -> "DataFrame":
```

How to use it? take mapInPandas as an example,

``` python
from pyspark import TaskContext
def func(iterator):
    tc = TaskContext.get()
    assert tc.cpus() == 3
    for batch in iterator:
        yield batch
df = spark.range(10)

from pyspark.resource import TaskResourceRequests, ResourceProfileBuilder
treqs = TaskResourceRequests().cpus(3)
rp = ResourceProfileBuilder().require(treqs).build

df.mapInPandas(func, "id long", False, rp).collect()
```

### How was this patch tested?

The newly added tests can pass, and some manual tests are needed for dynamic allocation on or off.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #44852 from wbo4958/df-rp.

Lead-authored-by: Bobby Wang <bobwang@nvidia.com>
Co-authored-by: Bobby Wang <wbo4958@gmail.com>
Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants