Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-20392][SQL] Set barrier to prevent re-entering a tree #19873

Closed
wants to merge 5 commits into from

Conversation

viirya
Copy link
Member

@viirya viirya commented Dec 4, 2017

What changes were proposed in this pull request?

The SQL Analyzer goes through a whole query plan even most part of it is analyzed. This increases the time spent on query analysis for long pipelines in ML, especially.

This patch adds a logical node called AnalysisBarrier that wraps an analyzed logical plan to prevent it from analysis again. The barrier is applied to the analyzed logical plan in Dataset. It won't change the output of wrapped logical plan and just acts as a wrapper to hide it from analyzer. New operations on the dataset will be put on the barrier, so only the new nodes created will be analyzed.

This analysis barrier will be removed at the end of analysis stage.

How was this patch tested?

Added tests.

@viirya
Copy link
Member Author

viirya commented Dec 4, 2017

cc @cloud-fan @hvanhovell Basically this is the same changes in #17770.

@SparkQA
Copy link

SparkQA commented Dec 4, 2017

Test build #84417 has finished for PR 19873 at commit 136fd30.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class AnalysisBarrier(child: LogicalPlan) extends LeafNode

@viirya
Copy link
Member Author

viirya commented Dec 4, 2017

retest this please.

@SparkQA
Copy link

SparkQA commented Dec 4, 2017

Test build #84420 has finished for PR 19873 at commit 136fd30.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class AnalysisBarrier(child: LogicalPlan) extends LeafNode

@SparkQA
Copy link

SparkQA commented Dec 4, 2017

Test build #84430 has finished for PR 19873 at commit 9f5a0e4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class AnalysisBarrier(child: LogicalPlan) extends LeafNode

*
* @param rule the function use to transform this nodes children
*/
def resolveOperators(rule: PartialFunction[LogicalPlan, LogicalPlan]): LogicalPlan = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we also remove the analyzed flag in this class?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

@@ -241,7 +241,7 @@ class PlannerSuite extends SharedSQLContext {
test("collapse adjacent repartitions") {
val doubleRepartitioned = testData.repartition(10).repartition(20).coalesce(5)
def countRepartitions(plan: LogicalPlan): Int = plan.collect { case r: Repartition => r }.length
assert(countRepartitions(doubleRepartitioned.queryExecution.logical) === 3)
assert(countRepartitions(doubleRepartitioned.queryExecution.analyzed) === 3)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it a necessary change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan
Copy link
Contributor

LGTM, also cc @gatorsmile

@@ -280,7 +280,7 @@ object TypeCoercion {
*/
object WidenSetOperationTypes extends Rule[LogicalPlan] {

def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
case p if p.analyzed => p
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?

Copy link
Member Author

@viirya viirya Dec 5, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, what do you mean why?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In which cases, we should still use the analyzed flag?

@@ -666,7 +667,9 @@ class Analyzer(
* Generate a new logical plan for the right child with different expression IDs
* for all conflicting attributes.
*/
private def dedupRight (left: LogicalPlan, right: LogicalPlan): LogicalPlan = {
private def dedupRight (left: LogicalPlan, oriRight: LogicalPlan): LogicalPlan = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is oriRight ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use originalRight

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok.

@@ -470,7 +470,7 @@ case class DataSource(
}.head
}
// For partitioned relation r, r.schema's column ordering can be different from the column
// ordering of data.logicalPlan (partition columns are all moved after data column). This
// ordering of data.logicalPlan (partition columns are all moved after data column). This
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Get rid of changes in this file.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

case sa @ Sort(_, _, child: Aggregate) => sa

case s @ Sort(order, _, child) if !s.resolved && child.resolved =>
case s @ Sort(order, _, oriChild) if !s.resolved && oriChild.resolved =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use originalChild

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok.

@@ -1098,7 +1103,8 @@ class Analyzer(
case ae: AnalysisException => s
}

case f @ Filter(cond, child) if !f.resolved && child.resolved =>
case f @ Filter(cond, oriChild) if !f.resolved && oriChild.resolved =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use originalChild

@gatorsmile
Copy link
Member

gatorsmile commented Dec 5, 2017

From the PR description, I am unable to tell the changes made in this PR. We need a better description to explain what is the solution proposed in this PR.

Also explains which cases need a special handling and the reason.

@SparkQA
Copy link

SparkQA commented Dec 5, 2017

Test build #84475 has finished for PR 19873 at commit bae034d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 5, 2017

Test build #84477 has finished for PR 19873 at commit 54182bf.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

@viirya Could you resolve the conflicts?

@@ -881,3 +881,10 @@ case class Deduplicate(

override def output: Seq[Attribute] = child.output
}

/** A logical plan for setting a barrier of analysis */
case class AnalysisBarrier(child: LogicalPlan) extends LeafNode {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put the PR descriptions to the comment of this class?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok.

@@ -23,8 +23,8 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.types.IntegerType

/**
* This suite is used to test [[LogicalPlan]]'s `resolveOperators` and make sure it can correctly
* skips sub-trees that have already been marked as analyzed.
* This suite is used to test [[LogicalPlan]]'s `transformUp` plus analysis barrier and make sure
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since both transformUp and transformDown work, create a test case using transformDown. Also update the comments here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

case Kurtosis(e @ StringType()) => Kurtosis(Cast(e, DoubleType))
}
override protected def coerceTypes(plan: LogicalPlan): LogicalPlan =
plan transformAllExpressions {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For indentation...

@SparkQA
Copy link

SparkQA commented Dec 6, 2017

Test build #84518 has finished for PR 19873 at commit 4775a02.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class ReqAndHandler(req: Request, handler: MemberHandler)
  • trait TypeCoercionRule extends Rule[LogicalPlan] with Logging

@SparkQA
Copy link

SparkQA commented Dec 6, 2017

Test build #84520 has finished for PR 19873 at commit d2375e0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

LGTM

@gatorsmile
Copy link
Member

gatorsmile commented Dec 6, 2017

Thanks! Merged to master.

@asfgit asfgit closed this in 00d176d Dec 6, 2017
@viirya
Copy link
Member Author

viirya commented Dec 6, 2017

Thanks! @gatorsmile @cloud-fan

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants