Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into dag-viz-sql
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Or committed May 8, 2015
2 parents ffd237a + f496bf3 commit 0d49fd6
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1028,7 +1028,7 @@ class SparkILoop(
logInfo("Created sql context (with Hive support)..")
}
catch {
case cnf: java.lang.ClassNotFoundException =>
case _: java.lang.ClassNotFoundException | _: java.lang.NoClassDefFoundError =>
sqlContext = new SQLContext(sparkContext)
logInfo("Created sql context..")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ class Analyzer(
val extendedResolutionRules: Seq[Rule[LogicalPlan]] = Nil

lazy val batches: Seq[Batch] = Seq(
Batch("Substitution", fixedPoint,
CTESubstitution ::
WindowsSubstitution ::
Nil : _*),
Batch("Resolution", fixedPoint,
ResolveRelations ::
ResolveReferences ::
Expand All @@ -71,6 +75,55 @@ class Analyzer(
extendedResolutionRules : _*)
)

/**
* Substitute child plan with cte definitions
*/
object CTESubstitution extends Rule[LogicalPlan] {
// TODO allow subquery to define CTE
def apply(plan: LogicalPlan): LogicalPlan = plan match {
case With(child, relations) => substituteCTE(child, relations)
case other => other
}

def substituteCTE(plan: LogicalPlan, cteRelations: Map[String, LogicalPlan]): LogicalPlan = {
plan transform {
// In hive, if there is same table name in database and CTE definition,
// hive will use the table in database, not the CTE one.
// Taking into account the reasonableness and the implementation complexity,
// here use the CTE definition first, check table name only and ignore database name
// see https://github.com/apache/spark/pull/4929#discussion_r27186638 for more info
case u : UnresolvedRelation =>
val substituted = cteRelations.get(u.tableIdentifier.last).map { relation =>
val withAlias = u.alias.map(Subquery(_, relation))
withAlias.getOrElse(relation)
}
substituted.getOrElse(u)
}
}
}

/**
* Substitute child plan with WindowSpecDefinitions.
*/
object WindowsSubstitution extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// Lookup WindowSpecDefinitions. This rule works with unresolved children.
case WithWindowDefinition(windowDefinitions, child) =>
child.transform {
case plan => plan.transformExpressions {
case UnresolvedWindowExpression(c, WindowSpecReference(windowName)) =>
val errorMessage =
s"Window specification $windowName is not defined in the WINDOW clause."
val windowSpecDefinition =
windowDefinitions
.get(windowName)
.getOrElse(failAnalysis(errorMessage))
WindowExpression(c, windowSpecDefinition)
}
}
}
}

/**
* Removes no-op Alias expressions from the plan.
*/
Expand Down Expand Up @@ -172,36 +225,20 @@ class Analyzer(
* Replaces [[UnresolvedRelation]]s with concrete relations from the catalog.
*/
object ResolveRelations extends Rule[LogicalPlan] {
def getTable(u: UnresolvedRelation, cteRelations: Map[String, LogicalPlan]): LogicalPlan = {
def getTable(u: UnresolvedRelation): LogicalPlan = {
try {
// In hive, if there is same table name in database and CTE definition,
// hive will use the table in database, not the CTE one.
// Taking into account the reasonableness and the implementation complexity,
// here use the CTE definition first, check table name only and ignore database name
cteRelations.get(u.tableIdentifier.last)
.map(relation => u.alias.map(Subquery(_, relation)).getOrElse(relation))
.getOrElse(catalog.lookupRelation(u.tableIdentifier, u.alias))
catalog.lookupRelation(u.tableIdentifier, u.alias)
} catch {
case _: NoSuchTableException =>
u.failAnalysis(s"no such table ${u.tableName}")
}
}

def apply(plan: LogicalPlan): LogicalPlan = {
val (realPlan, cteRelations) = plan match {
// TODO allow subquery to define CTE
// Add cte table to a temp relation map,drop `with` plan and keep its child
case With(child, relations) => (child, relations)
case other => (other, Map.empty[String, LogicalPlan])
}

realPlan transform {
case i@InsertIntoTable(u: UnresolvedRelation, _, _, _, _) =>
i.copy(
table = EliminateSubQueries(getTable(u, cteRelations)))
case u: UnresolvedRelation =>
getTable(u, cteRelations)
}
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case i@InsertIntoTable(u: UnresolvedRelation, _, _, _, _) =>
i.copy(table = EliminateSubQueries(getTable(u)))
case u: UnresolvedRelation =>
getTable(u)
}
}

Expand Down Expand Up @@ -664,21 +701,6 @@ class Analyzer(
// We have to use transformDown at here to make sure the rule of
// "Aggregate with Having clause" will be triggered.
def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
// Lookup WindowSpecDefinitions. This rule works with unresolved children.
case WithWindowDefinition(windowDefinitions, child) =>
child.transform {
case plan => plan.transformExpressions {
case UnresolvedWindowExpression(c, WindowSpecReference(windowName)) =>
val errorMessage =
s"Window specification $windowName is not defined in the WINDOW clause."
val windowSpecDefinition =
windowDefinitions
.get(windowName)
.getOrElse(failAnalysis(errorMessage))
WindowExpression(c, windowSpecDefinition)
}
}

// Aggregate with Having clause. This rule works with an unresolved Aggregate because
// a resolved Aggregate will not have Window Functions.
case f @ Filter(condition, a @ Aggregate(groupingExprs, aggregateExprs, child))
Expand Down

0 comments on commit 0d49fd6

Please sign in to comment.