-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-42896][SQL][PYTHON] Make mapInPandas
/ mapInArrow
support barrier mode execution
#40520
Conversation
mapInPandas
/ mapInArrow` support barrier mode executionmapInPandas
/ mapInArrow
support barrier mode execution
@@ -28,7 +28,8 @@ import org.apache.spark.sql.execution.SparkPlan | |||
case class MapInPandasExec( | |||
func: Expression, | |||
output: Seq[Attribute], | |||
child: SparkPlan) | |||
child: SparkPlan, | |||
override val isBarrier: Boolean) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would require an implementation much complicated than this actually. The current implementation might work for now because these specific physical plans don't move around much for now but the implementation is flaky because the physical plans can change via Catalyst Optimizer (e.g., predicate pushdown) but the barrier execution mode requires that the barrier is created exactly the location where it's invoked.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change via Catalyst Optimizer (e.g., predicate pushdown)
Do you mean pushdown like this ?
Filter
MapInPandas
XXOp
....
to
MapInPandas
Filter
XXOp
....
?
I don't think we should allow pushdown in this case. MapInPandas
executes arbitrary user code. Can we modify optimizer code to prevent it change the plan when it finds "is_barrier" set ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The feature mapInPandas / mapInArrow
supporting barrier mode is only for ML use case, we don't need to complicate it. We only need to support such scenario:
input_dataset.mapInPandas(..., is_barrier=True).collect()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant that the optimizers can might switch the physical plan order in the future, and that's why the implementation is flaky. Now this works because we don't switch the order anywhere else.
Can we modify optimizer code to prevent it change the plan when it finds "is_barrier" set ?
To have a complete implementation, we should have something like #19873 which brought a lot of side effect, and it got reverted in the end.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is only for internal purpose, let's probably don't expose is_barrier: bool = False
to the API surface at least.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To simply the implementation, we can implement a barrierMapInPandasAndCollect
instead, and define a execution plan stage like BarrierMapInPandasAndCollectExec
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is only for internal purpose, let's probably don't expose is_barrier: bool = False to the API surface at least.
No. It should be public API. Third-party lib such as xgboost also need to use it.
To address @HyukjinKwon 's concern about optimizer, can we add CC @cloud-fan @mengxr WDYT ? Barrier mode is only used in specific ML case, i.e. in model training routine, we will only use it in one pattern:
and we don't need complex optimization for it. |
hmmm why do we need to care about the optimizer? The optimizer is not sensitive to the physical execution engine, e.g. Presto, Spark, Flink have many similar SQL optimizations. |
I am not familier with optimizer details but it is concern from @HyukjinKwon |
Predicate pushdown is just an example. e.g., you might want to combine adjacent |
I am saying that real power of Catalyst optimizer is to optimize/reorder these logical plans, and I believe that's the reason why barrier execution wasn't introduced in SQL. The barrier has to be created exactly when the call is invoked, in which basically it requires something like #19873 to have a sound implementation. However, I am fine with having this as an exception if you guys are fine with this. |
If it is the only usage case, i think it will be safe to add dedicated logical plan and physical plan for it. |
@cloud-fan What do you think of this ? |
From a SQL engine's point of view, running all tasks at once or batch by batch doesn't matter. It doesn't change the semantics of the SQL operator, and the optimizer doesn't care about it. However, |
mapInPandas
/ mapInArrow
support barrier mode executionmapInPandas
/ mapInArrow
support barrier mode execution
Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok lgtm
merged to master :) |
@@ -60,6 +60,7 @@ def mapInPandas( | |||
schema : :class:`pyspark.sql.types.DataType` or str | |||
the return type of the `func` in PySpark. The value can be either a | |||
:class:`pyspark.sql.types.DataType` object or a DDL-formatted type string. | |||
isBarrier : Use barrier mode execution if True. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should mark these new arguments by .. versionadded
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes we should. with the directive .. versionchanged
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let me just make a quick followup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
…urceProfile ### What changes were proposed in this pull request? Support stage-level scheduling for some PySpark DataFrame APIs (mapInPandas and mapInArrow). ### Why are the changes needed? The introduction of barrier mode in Spark, as seen in #40520, allows for the implementation of Spark ML cases (pure Python algorithms) using DataFrame APIs such as mapInPandas and mapInArrow, so it's necessary to enable stage-level scheduling for DataFrame APIs. ### Does this PR introduce _any_ user-facing change? Yes, This PR adds a new argument "profile" for mapInPandas and mapInArrow. ``` python def mapInPandas( self, func: "PandasMapIterFunction", schema: Union[StructType, str], barrier: bool = False, profile: Optional[ResourceProfile] = None, ) -> "DataFrame": def mapInArrow( self, func: "ArrowMapIterFunction", schema: Union[StructType, str], barrier: bool = False, profile: Optional[ResourceProfile] = None, ) -> "DataFrame": ``` How to use it? take mapInPandas as an example, ``` python from pyspark import TaskContext def func(iterator): tc = TaskContext.get() assert tc.cpus() == 3 for batch in iterator: yield batch df = spark.range(10) from pyspark.resource import TaskResourceRequests, ResourceProfileBuilder treqs = TaskResourceRequests().cpus(3) rp = ResourceProfileBuilder().require(treqs).build df.mapInPandas(func, "id long", False, rp).collect() ``` ### How was this patch tested? The newly added tests can pass, and some manual tests are needed for dynamic allocation on or off. ### Was this patch authored or co-authored using generative AI tooling? No Closes #44852 from wbo4958/df-rp. Lead-authored-by: Bobby Wang <bobwang@nvidia.com> Co-authored-by: Bobby Wang <wbo4958@gmail.com> Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
What changes were proposed in this pull request?
Make mapInPandas / mapInArrow support barrier mode execution
Why are the changes needed?
This is the preparation PR for supporting mapInPandas / mapInArrow barrier execution in spark connect mode. The feature is required by machine learning use cases.
Does this PR introduce any user-facing change?
No.
How was this patch tested?