-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-15076][SQL] Add ReorderAssociativeOperator optimizer #12850
Conversation
Test build #57567 has finished for PR 12850 at commit
|
Test build #57781 has finished for PR 12850 at commit
|
Test build #58006 has finished for PR 12850 at commit
|
Test build #58113 has finished for PR 12850 at commit
|
Rebased to see the result on re-enable hive queries. |
Test build #58246 has finished for PR 12850 at commit
|
Test build #58519 has finished for PR 12850 at commit
|
Test build #58648 has finished for PR 12850 at commit
|
Test build #58875 has finished for PR 12850 at commit
|
Rebased to trigger Jenkins test again. |
Test build #58884 has finished for PR 12850 at commit
|
Test build #59114 has finished for PR 12850 at commit
|
Test build #59531 has finished for PR 12850 at commit
|
// Use associative property for integral type | ||
case e if e.isInstanceOf[BinaryArithmetic] && e.dataType.isInstanceOf[IntegralType] | ||
=> e match { | ||
case Add(Add(a, b), c) if b.foldable && c.foldable => Add(a, Add(b, c)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about a + 1 + b + 2
? I think we need a more general approach, like reordering the Add
nodes to put all literals together.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for review, @cloud-fan !
I see. That sounds great.
Let me think about how to eliminate all constants then.
Test build #59645 has finished for PR 12850 at commit
|
Hi, @cloud-fan . |
@@ -742,6 +742,23 @@ object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelpe | |||
* equivalent [[Literal]] values. | |||
*/ | |||
object ConstantFolding extends Rule[LogicalPlan] { | |||
private def isAssociativelyFoldable(e: Expression): Boolean = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to ReorderJoin
, we should have a new rule ReorderAssociativeOperator
to do this optimization, instead of putting it in ConstantFolding
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, that could be.
There is some difference on level of granulity.
Join-related optimizers might be improved later to cost-based optimizers while ConstantFolder optimizer is just about removing constants on a single expression.
Do you think it is a good idea to put the different levels of concerns together?
I can do this in any way you decide. :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is OK, BooleanSimplification
is also kind of constant folding but we made a new rule for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you. I see!
Hi, @cloud-fan . |
Test build #59691 has finished for PR 12850 at commit
|
Test build #59690 has finished for PR 12850 at commit
|
Test build #59700 has finished for PR 12850 at commit
|
Hi, @cloud-fan . |
I discussed it with @davies offline and here is our conclusion:
In general, we think this feature brings too much nondeterminacy compared to the benefits it brings. What do you think? |
Thank you for deep discussion on this. I think like this. For 1), there are machine-generated queries by BI tools. This is an important category of queries. In many cases, BIs (or other tools having UI) will generated queries by simple rules and those rule does not care about the output queries. The optimization is the role of DBMS or Spark. So, static optimizations are always important. This PR also minimizes the size of generated codes, too. For 2), other optimizers already remove or duplicate UDFs. Spark dose not give the control of the execution order. As you know, we already made the conclusion to leave an explicit note like the following for this (in SPARK-15282 and #13087).
For 3), could you give some problematic real cases? This PR reordered only addition or multiplications, but I think this PR does not change the final result value. The following is the behavior of current Spark. (Not this PR. You can see that in the physical plan.) scala> sql("select 2147483640 + a + 7 from (select explode(array(1,2,3)) a)").explain()
== Physical Plan ==
*Project [((2147483640 + a#8) + 7) AS ((2147483640 + a) + 7)#9]
+- Generate explode([1,2,3]), false, false, [a#8]
+- Scan OneRowRelation[]
scala> sql("select 2147483640 + a + 7 from (select explode(array(1,2,3)) a)").collect()
res1: Array[org.apache.spark.sql.Row] = Array([-2147483648], [-2147483647], [-2147483646])
scala> sql("select a + 2147483647 from (select explode(array(1,2,3)) a)").collect()
res2: Array[org.apache.spark.sql.Row] = Array([-2147483648], [-2147483647], [-2147483646])
scala> sql("select 214748364 * a from (select explode(array(1,2,3)) a)").collect()
res3: Array[org.apache.spark.sql.Row] = Array([214748364], [429496728], [644245092])
scala> sql("select 214748364 * a * 10 from (select explode(array(1,2,3)) a)").collect()
res4: Array[org.apache.spark.sql.Row] = Array([2147483640], [-16], [2147483624])
scala> sql("select a * 2147483640 from (select explode(array(1,2,3)) a)").collect()
res5: Array[org.apache.spark.sql.Row] = Array([2147483640], [-16], [2147483624]) Apparently, the optimization of this PR will work like the above. |
Hi, @cloud-fan and @davies . |
UDF is the first thing I came out, and yes, it must be deterministic. But as we have the You can still improve this PR to handle non-deterministic cases, but that will make this PR more complex and harder to reason about, which may not worth. cc @davies |
Thank you for feedback. I'm really happy with your attention! |
I added the missing part, |
} | ||
|
||
def apply(plan: LogicalPlan): LogicalPlan = plan transform { | ||
case q: LogicalPlan => q transformExpressionsDown { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about
def flattenAdd(e: Expression): Seq[Expression] = e match {
case Add(l, r) => flattenAdd(l) ++ flattenAdd(r)
case other => other
}
...
plan transformAllExpressions {
case a: Add if a.deterministic && a.dataType.isInstanceOf[IntegralType] =>
val (foldables, others) => flattenAdd(a).partition(_.foldable)
if (foldables.size > 1) {
val foldableExpr = foldables.reduce(Add(_, _))
val c = Literal.create(foldableExpr.eval(), a.dataType)
if (others.isEmpty) c else Add(others.reduce(Add(_, _)), c)
} else {
a
}
}
We can duplicate some code for Multiply
, and I think this maybe more readable than the current version.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. That could be.
We also need to add isSingleOperatorExpr
there.
Otherwise, flattenAdd(Add(Multiply(1, 2), 3)) -> (3)
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
flattenAdd(Add(Multiply(1, 2), 3))
will become [Multiply(1, 2), 3]
, and we won't get wrong result
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I see. You generalize my PR again! Great!
looks like it's not such difficult to handle all cases, this optimization LGTM |
'b * 1 * 2 * 3 * 4, | ||
'a + 1 + 'b + 2 + 'c + 3, | ||
Rand(0) * 1 * 2 * 3 * 4) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I already added non-deterministic case here.
Thank you for reconsidering this PR positively. I'll update soon according to your advice. |
@cloud-fan . |
Test build #59788 has finished for PR 12850 at commit
|
Test build #59795 has finished for PR 12850 at commit
|
case other => other :: Nil | ||
} | ||
|
||
def apply(plan: LogicalPlan): LogicalPlan = plan transformExpressionsDown { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should do:
plan transform {
case q: LogicalPlan => q transformExpressionsDown {
......
}
}
or here we just optimize the top level plan.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My bad. I changed this in a hurry. I'll fix soon.
cc @davies , can you take a look? |
BTW it goes without saying ... if you do decide to merge this, don't merge it in branch-2.0. |
Test build #59824 has finished for PR 12850 at commit
|
thanks, merging to master! |
Oh, thank you! @cloud-fan . |
What changes were proposed in this pull request?
This issue add a new optimizer
ReorderAssociativeOperator
by taking advantage of integral associative property. Currently, Spark works like the following.1 + 2 + 3 + 4 + 5 + 6 + 7 + 8 + 9 + a
into45 + a
.a + 1 + 2 + 3 + 4 + 5 + 6 + 7 + 8 + 9
.This PR can handle Case 2 for Add/Multiply expression whose data types are
ByteType
,ShortType
,IntegerType
, andLongType
. The followings are the plan comparison betweenbefore
andafter
this issue.Before
After
This PR is greatly generalized by @cloud-fan 's key ideas; he should be credited for the work he did.
How was this patch tested?
Pass the Jenkins tests including new testsuite.