-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-20392][SQL] Set barrier to prevent re-entering a tree #19873
Conversation
cc @cloud-fan @hvanhovell Basically this is the same changes in #17770. |
Test build #84417 has finished for PR 19873 at commit
|
retest this please. |
Test build #84420 has finished for PR 19873 at commit
|
136fd30
to
9f5a0e4
Compare
Test build #84430 has finished for PR 19873 at commit
|
* | ||
* @param rule the function use to transform this nodes children | ||
*/ | ||
def resolveOperators(rule: PartialFunction[LogicalPlan, LogicalPlan]): LogicalPlan = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we also remove the analyzed
flag in this class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed.
@@ -241,7 +241,7 @@ class PlannerSuite extends SharedSQLContext { | |||
test("collapse adjacent repartitions") { | |||
val doubleRepartitioned = testData.repartition(10).repartition(20).coalesce(5) | |||
def countRepartitions(plan: LogicalPlan): Int = plan.collect { case r: Repartition => r }.length | |||
assert(countRepartitions(doubleRepartitioned.queryExecution.logical) === 3) | |||
assert(countRepartitions(doubleRepartitioned.queryExecution.analyzed) === 3) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it a necessary change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please see previous discussion: https://github.com/apache/spark/pull/17770/files#r118480364
LGTM, also cc @gatorsmile |
@@ -280,7 +280,7 @@ object TypeCoercion { | |||
*/ | |||
object WidenSetOperationTypes extends Rule[LogicalPlan] { | |||
|
|||
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { | |||
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { | |||
case p if p.analyzed => p |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, what do you mean why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In which cases, we should still use the analyzed
flag?
@@ -666,7 +667,9 @@ class Analyzer( | |||
* Generate a new logical plan for the right child with different expression IDs | |||
* for all conflicting attributes. | |||
*/ | |||
private def dedupRight (left: LogicalPlan, right: LogicalPlan): LogicalPlan = { | |||
private def dedupRight (left: LogicalPlan, oriRight: LogicalPlan): LogicalPlan = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is oriRight
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use originalRight
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok.
@@ -470,7 +470,7 @@ case class DataSource( | |||
}.head | |||
} | |||
// For partitioned relation r, r.schema's column ordering can be different from the column | |||
// ordering of data.logicalPlan (partition columns are all moved after data column). This | |||
// ordering of data.logicalPlan (partition columns are all moved after data column). This |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Get rid of changes in this file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure.
case sa @ Sort(_, _, child: Aggregate) => sa | ||
|
||
case s @ Sort(order, _, child) if !s.resolved && child.resolved => | ||
case s @ Sort(order, _, oriChild) if !s.resolved && oriChild.resolved => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use originalChild
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok.
@@ -1098,7 +1103,8 @@ class Analyzer( | |||
case ae: AnalysisException => s | |||
} | |||
|
|||
case f @ Filter(cond, child) if !f.resolved && child.resolved => | |||
case f @ Filter(cond, oriChild) if !f.resolved && oriChild.resolved => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use originalChild
From the PR description, I am unable to tell the changes made in this PR. We need a better description to explain what is the solution proposed in this PR. Also explains which cases need a special handling and the reason. |
bae034d
to
54182bf
Compare
Test build #84475 has finished for PR 19873 at commit
|
Test build #84477 has finished for PR 19873 at commit
|
@viirya Could you resolve the conflicts? |
@@ -881,3 +881,10 @@ case class Deduplicate( | |||
|
|||
override def output: Seq[Attribute] = child.output | |||
} | |||
|
|||
/** A logical plan for setting a barrier of analysis */ | |||
case class AnalysisBarrier(child: LogicalPlan) extends LeafNode { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Put the PR descriptions to the comment of this class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok.
@@ -23,8 +23,8 @@ import org.apache.spark.sql.catalyst.plans.logical._ | |||
import org.apache.spark.sql.types.IntegerType | |||
|
|||
/** | |||
* This suite is used to test [[LogicalPlan]]'s `resolveOperators` and make sure it can correctly | |||
* skips sub-trees that have already been marked as analyzed. | |||
* This suite is used to test [[LogicalPlan]]'s `transformUp` plus analysis barrier and make sure |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since both transformUp
and transformDown
work, create a test case using transformDown
. Also update the comments here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure.
case Kurtosis(e @ StringType()) => Kurtosis(Cast(e, DoubleType)) | ||
} | ||
override protected def coerceTypes(plan: LogicalPlan): LogicalPlan = | ||
plan transformAllExpressions { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For indentation...
Test build #84518 has finished for PR 19873 at commit
|
Test build #84520 has finished for PR 19873 at commit
|
LGTM |
Thanks! Merged to master. |
Thanks! @gatorsmile @cloud-fan |
What changes were proposed in this pull request?
The SQL
Analyzer
goes through a whole query plan even most part of it is analyzed. This increases the time spent on query analysis for long pipelines in ML, especially.This patch adds a logical node called
AnalysisBarrier
that wraps an analyzed logical plan to prevent it from analysis again. The barrier is applied to the analyzed logical plan inDataset
. It won't change the output of wrapped logical plan and just acts as a wrapper to hide it from analyzer. New operations on the dataset will be put on the barrier, so only the new nodes created will be analyzed.This analysis barrier will be removed at the end of analysis stage.
How was this patch tested?
Added tests.