Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enabling AQE on [databricks] #6461

Merged
merged 30 commits into from
Sep 12, 2022
Merged

Conversation

NVnavkumar
Copy link
Collaborator

@NVnavkumar NVnavkumar commented Aug 31, 2022

Fixes #1059

This branch fixes a couple of issues with enabling Spark Adaptive Query Execution (AQE) on the Databricks Spark environment. Currently this is marked as WIP since I'm still investigating whether there are any more obvious ongoing issues with enabling adaptive execution in Databricks systems. Here are the issues fixed so far:

  1. implementing certain method calls that are currently required by the Databricks environment

  2. implementing handling of a logical plan window optimization that happens in Databricks distributions but not currently in Apache Spark.

  3. Fixed the Databricks shim for GpuShuffleExchangeExec for 10.4 to fix the issue with a duplicate submission of the map stage which causes a missing job id reference.

  4. Added to the AQEUtils shim to handle 9.1 so that ShuffleExchangeExec will fallback to CPU when AQE is enabled.

…rs next

Signed-off-by: Navin Kumar <navink@nvidia.com>
Signed-off-by: Navin Kumar <navink@nvidia.com>
Signed-off-by: Navin Kumar <navink@nvidia.com>
Signed-off-by: Navin Kumar <navink@nvidia.com>
Signed-off-by: Navin Kumar <navink@nvidia.com>
Signed-off-by: Navin Kumar <navink@nvidia.com>
Signed-off-by: Navin Kumar <navink@nvidia.com>
Signed-off-by: Navin Kumar <navink@nvidia.com>
…Stats fix

Signed-off-by: Navin Kumar <navink@nvidia.com>
Signed-off-by: Navin Kumar <navink@nvidia.com>
Signed-off-by: Navin Kumar <navink@nvidia.com>
… to AQE optimizations

Signed-off-by: Navin Kumar <navink@nvidia.com>
@NVnavkumar NVnavkumar self-assigned this Aug 31, 2022
@sameerz sameerz added the task Work required that improves the product but is not user facing label Aug 31, 2022
Signed-off-by: Navin Kumar <navink@nvidia.com>
@NVnavkumar
Copy link
Collaborator Author

build

@NVnavkumar
Copy link
Collaborator Author

Looks like I found a new failure when using SQL UNION, seems to crash the system:

E                   py4j.protocol.Py4JJavaError: An error occurred while calling o471.collectToPython.
E                   : org.apache.spark.SparkException: Job 3 cancelled as part of cancellation of all jobs
E                   	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3030)
E                   	at org.apache.spark.scheduler.DAGScheduler.handleJobCancellation(DAGScheduler.scala:2918)
E                   	at org.apache.spark.scheduler.DAGScheduler.$anonfun$doCancelAllJobs$2(DAGScheduler.scala:1283)
E                   	at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23)
E                   	at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
E                   	at org.apache.spark.scheduler.DAGScheduler.doCancelAllJobs(DAGScheduler.scala:1282)
E                   	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onError(DAGScheduler.scala:3253)
E                   	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:53)

Might be caused by the NPE here:

22/08/31 22:12:18 ERROR GpuOverrideUtil: Encountered an exception applying GPU overrides java.lang.NullPointerException
java.lang.NullPointerException
	at org.apache.spark.sql.rapids.GpuShuffleEnv$.isRapidsShuffleAvailable(GpuShuffleEnv.scala:119)
	at org.apache.spark.sql.rapids.GpuShuffleEnv$.useGPUShuffle(GpuShuffleEnv.scala:136)
	at com.nvidia.spark.rapids.GpuTransitionOverrides.$anonfun$apply$3(GpuTransitionOverrides.scala:570)
	at com.nvidia.spark.rapids.GpuOverrides$.logDuration(GpuOverrides.scala:474)
	at com.nvidia.spark.rapids.GpuTransitionOverrides.$anonfun$apply$1(GpuTransitionOverrides.scala:564)
	at com.nvidia.spark.rapids.GpuOverrideUtil$.$anonfun$tryOverride$1(GpuOverrides.scala:4376)
	at com.nvidia.spark.rapids.GpuTransitionOverrides.apply(GpuTransitionOverrides.scala:604)
	at com.nvidia.spark.rapids.GpuTransitionOverrides.apply(GpuTransitionOverrides.scala:39)
	at org.apache.spark.sql.execution.ApplyColumnarRulesAndInsertTransitions.$anonfun$apply$2(Columnar.scala:566)
	at org.apache.spark.sql.execution.ApplyColumnarRulesAndInsertTransitions.$anonfun$apply$2$adapted(Columnar.scala:566)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.sql.execution.ApplyColumnarRulesAndInsertTransitions.apply(Columnar.scala:566)
	at org.apache.spark.sql.execution.ApplyColumnarRulesAndInsertTransitions.apply(Columnar.scala:523)
	at org.apache.spark.sql.execution.QueryExecution$.$anonfun$prepareForExecution$2(QueryExecution.scala:596)
	at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
	at org.apache.spark.sql.execution.QueryExecution$.$anonfun$prepareForExecution$1(QueryExecution.scala:596)
	at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
	at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
	at scala.collection.immutable.List.foldLeft(List.scala:91)
	at org.apache.spark.sql.execution.QueryExecution$.prepareForExecution(QueryExecution.scala:595)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$2(QueryExecution.scala:232)
	at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:151)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:265)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:968)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:265)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:228)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:222)
	at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:298)
	at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:361)
	at org.apache.spark.sql.execution.QueryExecution.explainStringLocal(QueryExecution.scala:325)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$8(SQLExecution.scala:202)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:386)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:186)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:968)
	at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:141)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:336)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:160)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:156)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:575)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:167)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:575)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:268)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:264)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:551)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$1(QueryExecution.scala:156)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:324)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:156)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:141)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:132)
	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:225)
	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:104)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:968)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:101)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:803)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:968)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:798)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
	at py4j.Gateway.invoke(Gateway.java:295)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:195)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:115)
	at java.lang.Thread.run(Thread.java:748)

@NVnavkumar
Copy link
Collaborator Author

This is actually an in issue in GpuShuffleExchangeExec sending an incorrect job id (maybe stage id?) to the scheduler. Current workaround is to fallback ShuffleExchangeExec to CPU when using Databricks and AQE.

Looks like I found a new failure when using SQL UNION, seems to crash the system:

…s to using original Spark implementation to fix concurrency bug in shim

Signed-off-by: Navin Kumar <navink@nvidia.com>
Signed-off-by: Navin Kumar <navink@nvidia.com>
@NVnavkumar
Copy link
Collaborator Author

build

@NVnavkumar
Copy link
Collaborator Author

Fixed the issue in GpuShuffleExchangeExec on Databricks 10.4. For Databricks 9.1, ShuffleExchangeExec will fallback to CPU when AQE is enabled in that environment, due to the complexities of how it handles the submission of the map stage. Updated the lead comment to reflect updates.

@revans2
Copy link
Collaborator

revans2 commented Sep 2, 2022

Fixed the issue in GpuShuffleExchangeExec on Databricks 10.4. For Databricks 9.1, ShuffleExchangeExec will fallback to CPU when AQE is enabled in that environment, due to the complexities of how it handles the submission of the map stage. Updated the lead comment to reflect updates.

Have we measured the performance impact of this? In many cases AQE is not that big of a performance win, but sending the data to the CPU and back again is a really big performance hit.

@NVnavkumar
Copy link
Collaborator Author

NVnavkumar commented Sep 2, 2022 via email

Copy link
Collaborator

@tgravescs tgravescs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall seems fine, I thought at one point you had mentioned implementing computeStats per exec and trying to get a realistic size, did that not work out?

@NVnavkumar
Copy link
Collaborator Author

overall seems fine, I thought at one point you had mentioned implementing computeStats per exec and trying to get a realistic size, did that not work out?

If we wanted to do it correctly and most accurately, it is a bit an undertaking to implement the PlanVisitor for not just the leaf exec's but also some of the operations. In particular, we have to implement potentially our own Join estimation. Also, when I started looking into it, I got some strangely different numbers in our Parquet reading exec's vs what Databricks is reporting. I figure that can be done separately as potentially a task in the future (if there is any upside to the computation).

Signed-off-by: Navin Kumar <navink@nvidia.com>
Signed-off-by: Navin Kumar <navink@nvidia.com>
@NVnavkumar
Copy link
Collaborator Author

Fixed the issue in GpuShuffleExchangeExec on Databricks 10.4. For Databricks 9.1, ShuffleExchangeExec will fallback to CPU when AQE is enabled in that environment, due to the complexities of how it handles the submission of the map stage. Updated the lead comment to reflect updates.

Have we measured the performance impact of this? In many cases AQE is not that big of a performance win, but sending the data to the CPU and back again is a really big performance hit.

Actually figured out how to get GPU Shuffle back in 9.1, will push an update shortly

…logic

Signed-off-by: Navin Kumar <navink@nvidia.com>
@NVnavkumar
Copy link
Collaborator Author

build

@NVnavkumar NVnavkumar marked this pull request as ready for review September 6, 2022 23:52
@NVnavkumar NVnavkumar changed the title WIP: Enabling AQE on [databricks] Enabling AQE on [databricks] Sep 6, 2022
integration_tests/src/main/python/aqe_test.py Outdated Show resolved Hide resolved
integration_tests/src/main/python/aqe_test.py Show resolved Hide resolved
integration_tests/src/main/python/aqe_test.py Show resolved Hide resolved
integration_tests/src/main/python/aqe_test.py Show resolved Hide resolved
integration_tests/src/main/python/aqe_test.py Outdated Show resolved Hide resolved
// stage, replacing them with aliases. Also, sometimes children are not provided in the
// initial list of expressions after optimizations, so we add them here, and they will
// be deduped anyways in the other passes
val newChildren = wf.children.map(ce =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mythrocks mind taking a look at this

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the delay.

I'm not particularly familiar with the logic behind isPreNeeded, etc. But on discussion with @NVnavkumar, one wonders if we should check why isPreNeeded is turning up false on Databricks, with AQE turned on. We might adjust how isPreNeeded is calculated.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, isPreNeeded is false on Databricks, but it's a bit of a red herring in this case. We actually need to the window function children in the GpuWindowExec itself, it looks like the extra GpuProject does not help in this case.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After some exploration, I have determined that the bug this fixes is not AQE-specific and I'm not 100% confident that this fix is currently the right approach. I have reverted this fix and filed a new issue #6531 to track the bug here.

Signed-off-by: Navin Kumar <navink@nvidia.com>
Signed-off-by: Navin Kumar <navink@nvidia.com>
@NVnavkumar
Copy link
Collaborator Author

build

1 similar comment
@NVnavkumar
Copy link
Collaborator Author

build

@NVnavkumar
Copy link
Collaborator Author

build

…k in CI

Signed-off-by: Navin Kumar <navink@nvidia.com>
@NVnavkumar
Copy link
Collaborator Author

build

@NVnavkumar
Copy link
Collaborator Author

build

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
task Work required that improves the product but is not user facing
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[BUG] adaptive query executor and delta optimized table writes don't work on databricks
5 participants