Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-14664][SQL] Implement DecimalAggregates optimization for Window queries #12421

Closed
wants to merge 4 commits into from
Closed

Conversation

dongjoon-hyun
Copy link
Member

@dongjoon-hyun dongjoon-hyun commented Apr 15, 2016

What changes were proposed in this pull request?

This PR aims to implement decimal aggregation optimization for window queries by improving existing DecimalAggregates. Historically, DecimalAggregates optimizer is designed to transform general sum/avg(decimal), but it breaks recently added windows queries like the followings. The following queries work well without the current DecimalAggregates optimizer.

Sum

scala> sql("select sum(a) over () from (select explode(array(1.0,2.0)) a) t").head
java.lang.RuntimeException: Unsupported window function: MakeDecimal((sum(UnscaledValue(a#31)),mode=Complete,isDistinct=false),12,1)
scala> sql("select sum(a) over () from (select explode(array(1.0,2.0)) a) t").explain()
== Physical Plan ==
WholeStageCodegen
:  +- Project [sum(a) OVER (  ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#23]
:     +- INPUT
+- Window [MakeDecimal((sum(UnscaledValue(a#21)),mode=Complete,isDistinct=false),12,1) windowspecdefinition(ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS sum(a) OVER (  ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#23]
   +- Exchange SinglePartition, None
      +- Generate explode([1.0,2.0]), false, false, [a#21]
         +- Scan OneRowRelation[]

Average

scala> sql("select avg(a) over () from (select explode(array(1.0,2.0)) a) t").head
java.lang.RuntimeException: Unsupported window function: cast(((avg(UnscaledValue(a#40)),mode=Complete,isDistinct=false) / 10.0) as decimal(6,5))
scala> sql("select avg(a) over () from (select explode(array(1.0,2.0)) a) t").explain()
== Physical Plan ==
WholeStageCodegen
:  +- Project [avg(a) OVER (  ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#44]
:     +- INPUT
+- Window [cast(((avg(UnscaledValue(a#42)),mode=Complete,isDistinct=false) / 10.0) as decimal(6,5)) windowspecdefinition(ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS avg(a) OVER (  ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#44]
   +- Exchange SinglePartition, None
      +- Generate explode([1.0,2.0]), false, false, [a#42]
         +- Scan OneRowRelation[]

After this PR, those queries work fine and new optimized physical plans look like the followings.

Sum

scala> sql("select sum(a) over () from (select explode(array(1.0,2.0)) a) t").explain()
== Physical Plan ==
WholeStageCodegen
:  +- Project [sum(a) OVER (  ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#35]
:     +- INPUT
+- Window [MakeDecimal((sum(UnscaledValue(a#33)),mode=Complete,isDistinct=false) windowspecdefinition(ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING),12,1) AS sum(a) OVER (  ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#35]
   +- Exchange SinglePartition, None
      +- Generate explode([1.0,2.0]), false, false, [a#33]
         +- Scan OneRowRelation[]

Average

scala> sql("select avg(a) over () from (select explode(array(1.0,2.0)) a) t").explain()
== Physical Plan ==
WholeStageCodegen
:  +- Project [avg(a) OVER (  ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#47]
:     +- INPUT
+- Window [cast(((avg(UnscaledValue(a#45)),mode=Complete,isDistinct=false) windowspecdefinition(ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) / 10.0) as decimal(6,5)) AS avg(a) OVER (  ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#47]
   +- Exchange SinglePartition, None
      +- Generate explode([1.0,2.0]), false, false, [a#45]
         +- Scan OneRowRelation[]

In this PR, SUM over window pattern matching is based on the code of @hvanhovell ; he should be credited for the work he did.

How was this patch tested?

Pass the Jenkins tests (with newly added testcases)

@hvanhovell
Copy link
Contributor

@dongjoon-hyun good catch!

Can't we just make the optimizer respect the window expression? I.e. wrap the entire window expression in the Cast/MakeDecimal expression?

@dongjoon-hyun
Copy link
Member Author

Oh, thank you for quick review. Actually, I tried to do first like that. There occurs exceptions about type mismatch due to the difference from input schema. So, it seems not so straightforward to me.

However, I'll try again according to your comments. Thank you, @hvanhovell !

@SparkQA
Copy link

SparkQA commented Apr 15, 2016

Test build #55940 has finished for PR 12421 at commit 48003ee.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member Author

Hi, @hvanhovell . According to your comments, I've worked this for a week, but I'm afraid of being a bottleneck.
I feel there is a gap between preventing and optimizing.
Do you think you could take over this issue? If you do that for me, I will close this PR after merging yours.
I think you are the best person to extend DecimalAggregate optimizer for Window query.

@SparkQA
Copy link

SparkQA commented Apr 22, 2016

Test build #56602 has finished for PR 12421 at commit 7f64e62.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member Author

Hi, @hvanhovell .
Could you give me some advice about how to handle this issue and this PR?

@dongjoon-hyun
Copy link
Member Author

Rebased.

@SparkQA
Copy link

SparkQA commented Apr 24, 2016

Test build #56840 has finished for PR 12421 at commit 39b5d14.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -1321,7 +1321,9 @@ object DecimalAggregates extends Rule[LogicalPlan] {
/** Maximum number of decimal digits representable precisely in a Double */
private val MAX_DOUBLE_DIGITS = 15

def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
def apply(plan: LogicalPlan): LogicalPlan = plan transformExpressionsDown {
case we @ WindowExpression(AggregateExpression(_, _, _, _), _) => we
Copy link
Contributor

@hvanhovell hvanhovell Apr 24, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the following work for sum?

case we @ WindowExpression(ae @ AggregateExpression(Sum(e @ DecimalType.Expression(prec, scale)), _, _, _), _) if prec + 10 <= MAX_LONG_DIGITS =>
  MakeDecimal(we.copy(windowFunction = ae.copy(aggregateFunction = Sum(UnscaledValue(e))), prec + 10, scale))

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Apr 24, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, thank you for coming back, @hvanhovell . I tried that last week that, but it faced the following error in WindowExec.scala. (last week, it was Window.scala)

Unsupported window function: cast(((avg(UnscaledValue(a#14)),mode=Complete,isDistinct=false) / 10.0) as decimal(6,5))
java.lang.RuntimeException: Unsupported window function: cast(((avg(UnscaledValue(a#14)),mode=Complete,isDistinct=false) / 10.0) as decimal(6,5))
    at scala.sys.package$.error(package.scala:27)
    at org.apache.spark.sql.execution.WindowExec$$anonfun$windowFrameExpressionFactoryPairs$1$$anonfun$apply$2.apply(WindowExec.scala:183)

After fixing that, there occured errors at compiling generated code (codeGen result) due to the mismatched type between input schema (Decimal to Long and vice versa). I did try to change the input schema or BoundedReferences, too.

Actually, this observation is the same one I mentioned 9 days ago. I tried other approaches in other ways. But nothing was a clean solution for this. So, I decided to ask your help.

@dongjoon-hyun dongjoon-hyun changed the title [SPARK-14664][SQL] Fix DecimalAggregates optimizer not to break Window queries [WIP][SPARK-14664][SQL] Fix DecimalAggregates optimizer not to break Window queries Apr 25, 2016
@dongjoon-hyun
Copy link
Member Author

dongjoon-hyun commented Apr 25, 2016

Oh, my bad. Today, from your code, I tried again to implement DecimalAggregate optimizer for Windows and handled to implement it. I think I made some mistake during last week.

Thank you so much for your review and guiding, @hvanhovell ! After this PR passes the final Jenkins test, I will update the content of PR and JIRA, too.

@SparkQA
Copy link

SparkQA commented Apr 25, 2016

Test build #56881 has finished for PR 12421 at commit d83d8f8.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member Author

The failures are MemorySinkSuite. It seems to be irrelevant to this PR. I'll rebase to retrigger Jenkins.

[info] *** 2 TESTS FAILED ***
[error] Failed: Total 1962, Failed 2, Errors 0, Passed 1960, Ignored 28
[error] Failed tests:
[error]     org.apache.spark.sql.streaming.MemorySinkSuite
[error] (sql/test:test) sbt.TestsFailedException: Tests unsuccessful

@SparkQA
Copy link

SparkQA commented Apr 25, 2016

Test build #56885 has finished for PR 12421 at commit c1d09d7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun dongjoon-hyun changed the title [WIP][SPARK-14664][SQL] Fix DecimalAggregates optimizer not to break Window queries [SPARK-14664][SQL] Implement DecimalAggregates optimization for Window queries Apr 25, 2016
@dongjoon-hyun
Copy link
Member Author

Hi, @hvanhovell .
Now, it's ready for review again.
Thank you so much. I couldn't fix this without your help.

@SparkQA
Copy link

SparkQA commented Apr 26, 2016

Test build #56935 has finished for PR 12421 at commit 45beddd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member Author

More test cases are added.

@dongjoon-hyun
Copy link
Member Author

Rebased.

@SparkQA
Copy link

SparkQA commented Apr 26, 2016

Test build #56968 has finished for PR 12421 at commit 14fe6b9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member Author

Hi, @hvanhovell .
Could you review this PR when you have some time?

@dongjoon-hyun
Copy link
Member Author

Hi, @rxin .
Could you review and merge this PR?
This fixes RuntimeException error for Window aggregation queries.

@hvanhovell
Copy link
Contributor

@dongjoon-hyun I am gonna get to this today.


test("SPARK-14664: Decimal sum/avg over window should work.") {
checkAnswer(
sqlContext.sql("select sum(a) over () from (select explode(array(1.0,2.0,3.0)) a) t"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: you can also use select sum(a) over () from values 1.0, 2.0, 3.0 x(a)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you. I will use this consice form.

@hvanhovell
Copy link
Contributor

@dongjoon-hyun The PR is in pretty good shape. I left a few small comments/questions.

@dongjoon-hyun
Copy link
Member Author

@hvanhovell .
Thank you for squeezing your precious time for reviewing my PR several times.

@SparkQA
Copy link

SparkQA commented Apr 27, 2016

Test build #57147 has finished for PR 12421 at commit 152c6c2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 27, 2016

Test build #57149 has finished for PR 12421 at commit b2ac335.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@hvanhovell
Copy link
Contributor

LGTM

@hvanhovell
Copy link
Contributor

Merging to master. Thanks!

@asfgit asfgit closed this in af92299 Apr 27, 2016
@dongjoon-hyun
Copy link
Member Author

Thank you so much, @hvanhovell !

@dongjoon-hyun dongjoon-hyun deleted the SPARK-14664 branch May 12, 2016 01:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants