Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-33442][SQL] Change Combine Limit to Eliminate limit using max row #30368

Closed
wants to merge 17 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
OptimizeWindowFunctions,
CollapseWindow,
CombineFilters,
CombineLimits,
EliminateLimits,
CombineUnions,
// Constant folding and strength reduction
TransposeWindow,
Expand Down Expand Up @@ -1452,11 +1452,19 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
}

/**
* Combines two adjacent [[Limit]] operators into one, merging the
* expressions into one single expression.
cloud-fan marked this conversation as resolved.
Show resolved Hide resolved
* 1. Eliminate [[Limit]] operators if it's child max row <= limit.
* 2. Combines two adjacent [[Limit]] operators into one, merging the
* expressions into one single expression.
*/
object CombineLimits extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
object EliminateLimits extends Rule[LogicalPlan] {
private def canEliminate(limitExpr: Expression, childMaxRow: Option[Long]): Boolean = {
limitExpr.foldable && childMaxRow.exists { _ <= limitExpr.eval().toString.toLong }
viirya marked this conversation as resolved.
Show resolved Hide resolved
}

def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
ulysses-you marked this conversation as resolved.
Show resolved Hide resolved
case Limit(l, child) if canEliminate(l, child.maxRows) =>
child

case GlobalLimit(le, GlobalLimit(ne, grandChild)) =>
GlobalLimit(Least(Seq(ne, le)), grandChild)
case LocalLimit(le, LocalLimit(ne, grandChild)) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ class CombiningLimitsSuite extends PlanTest {
Batch("Column Pruning", FixedPoint(100),
ColumnPruning,
RemoveNoopOperators) ::
Batch("Combine Limit", FixedPoint(10),
CombineLimits) ::
Batch("Eliminate Limit", FixedPoint(10),
EliminateLimits) ::
Batch("Constant Folding", FixedPoint(10),
NullPropagation,
ConstantFolding,
Expand Down Expand Up @@ -90,4 +90,22 @@ class CombiningLimitsSuite extends PlanTest {

comparePlans(optimized, correctAnswer)
}

test("SPARK-33442: Change Combine Limit to Eliminate limit using max row") {
// test child max row <= limit.
val query1 = testRelation.select().groupBy()(count(1)).limit(1).analyze
val optimized1 = Optimize.execute(query1)
val expected1 = testRelation.select().groupBy()(count(1)).analyze
comparePlans(optimized1, expected1)

// test child max row > limit.
val query2 = testRelation.select().groupBy()(count(1)).limit(0).analyze
val optimized2 = Optimize.execute(query2)
comparePlans(optimized2, query2)

// test child max row is none
val query3 = testRelation.select(Symbol("a")).limit(1).analyze
val optimized3 = Optimize.execute(query3)
comparePlans(optimized3, query3)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class LimitPushdownSuite extends PlanTest {
EliminateSubqueryAliases) ::
Batch("Limit pushdown", FixedPoint(100),
LimitPushDown,
CombineLimits,
EliminateLimits,
ConstantFolding,
BooleanSimplification) :: Nil
}
Expand Down Expand Up @@ -74,7 +74,7 @@ class LimitPushdownSuite extends PlanTest {
Union(testRelation.limit(1), testRelation2.select('d, 'e, 'f).limit(1)).limit(2)
val unionOptimized = Optimize.execute(unionQuery.analyze)
val unionCorrectAnswer =
Limit(2, Union(testRelation.limit(1), testRelation2.select('d, 'e, 'f).limit(1))).analyze
Union(testRelation.limit(1), testRelation2.select('d, 'e, 'f).limit(1)).analyze
comparePlans(unionOptimized, unionCorrectAnswer)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
== Physical Plan ==
TakeOrderedAndProject (41)
* Sort (41)
+- * HashAggregate (40)
+- Exchange (39)
+- * HashAggregate (38)
Expand Down Expand Up @@ -229,7 +229,7 @@ Functions [3]: [sum(UnscaledValue(cs_ext_ship_cost#6)), sum(UnscaledValue(cs_net
Aggregate Attributes [3]: [sum(UnscaledValue(cs_ext_ship_cost#6))#22, sum(UnscaledValue(cs_net_profit#7))#23, count(cs_order_number#5)#27]
Results [3]: [count(cs_order_number#5)#27 AS order count #30, MakeDecimal(sum(UnscaledValue(cs_ext_ship_cost#6))#22,17,2) AS total shipping cost #31, MakeDecimal(sum(UnscaledValue(cs_net_profit#7))#23,17,2) AS total net profit #32]

(41) TakeOrderedAndProject
(41) Sort [codegen id : 8]
Input [3]: [order count #30, total shipping cost #31, total net profit #32]
Arguments: 100, [order count #30 ASC NULLS FIRST], [order count #30, total shipping cost #31, total net profit #32]
Arguments: [order count #30 ASC NULLS FIRST], true, 0

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
TakeOrderedAndProject [order count ,total shipping cost ,total net profit ]
WholeStageCodegen (8)
WholeStageCodegen (8)
Sort [order count ]
HashAggregate [sum,sum,count] [sum(UnscaledValue(cs_ext_ship_cost)),sum(UnscaledValue(cs_net_profit)),count(cs_order_number),order count ,total shipping cost ,total net profit ,sum,sum,count]
InputAdapter
Exchange #1
Expand Down
Loading