-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-42896][SQL][PYTHON] Make mapInPandas
/ mapInArrow
support barrier mode execution
#40520
Changes from all commits
4de9d07
f612bcb
e95fd95
aa7862f
9be4cad
9ebb3f2
85b840b
66f13cb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,7 +28,8 @@ import org.apache.spark.sql.execution.SparkPlan | |
case class MapInPandasExec( | ||
func: Expression, | ||
output: Seq[Attribute], | ||
child: SparkPlan) | ||
child: SparkPlan, | ||
override val isBarrier: Boolean) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would require an implementation much complicated than this actually. The current implementation might work for now because these specific physical plans don't move around much for now but the implementation is flaky because the physical plans can change via Catalyst Optimizer (e.g., predicate pushdown) but the barrier execution mode requires that the barrier is created exactly the location where it's invoked. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Do you mean pushdown like this ?
to
? I don't think we should allow pushdown in this case. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The feature
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I meant that the optimizers can might switch the physical plan order in the future, and that's why the implementation is flaky. Now this works because we don't switch the order anywhere else.
To have a complete implementation, we should have something like #19873 which brought a lot of side effect, and it got reverted in the end. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If this is only for internal purpose, let's probably don't expose There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To simply the implementation, we can implement a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
No. It should be public API. Third-party lib such as xgboost also need to use it. |
||
extends MapInBatchExec { | ||
|
||
override protected val pythonEvalType: Int = PythonEvalType.SQL_MAP_PANDAS_ITER_UDF | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should mark these new arguments by
.. versionadded
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes we should. with the directive
.. versionchanged
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let me just make a quick followup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#40571