Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-31809][SQL] Infer IsNotNull from some special equality join keys #28642

Closed
wants to merge 4 commits into from
Closed

[SPARK-31809][SQL] Infer IsNotNull from some special equality join keys #28642

wants to merge 4 commits into from

Conversation

wangyum
Copy link
Member

@wangyum wangyum commented May 26, 2020

What changes were proposed in this pull request?

We can infer IsNotNull from some special equality join keys. For example:

CREATE TABLE t1(a string, b string, c string) using parquet;
CREATE TABLE t2(a string, b decimal(38, 18), c string) using parquet;
SELECT t1.* FROM t1 JOIN t2 ON coalesce(t1.a, t1.b)=t2.a; -- case 1
SELECT t1.* FROM t1 JOIN t2 ON CAST(t1.a AS DOUBLE)=CAST(t2.b AS DOUBLE); -- case 2

The coalesce(t1.a, t1.b) or CAST(t1.a AS DOUBLE) may generate a lot of null values, which will lead to skew join.
After this pr:

== Physical Plan ==
*(5) Project [a#5, b#6, c#7]
+- *(5) SortMergeJoin [coalesce(a#5, b#6)], [a#8], Inner
   :- *(2) Sort [coalesce(a#5, b#6) ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(coalesce(a#5, b#6), 200), true, [id=#44]
   :     +- *(1) Filter isnotnull(coalesce(a#5, b#6))
   :        +- Scan hive default.t1 [a#5, b#6, c#7], HiveTableRelation `default`.`t1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [a#5, b#6, c#7], Statistics(sizeInBytes=8.0 EiB)
   +- *(4) Sort [a#8 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(a#8, 200), true, [id=#52]
         +- *(3) Filter isnotnull(a#8)
            +- Scan hive default.t2 [a#8], HiveTableRelation `default`.`t2`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [a#8, b#9, c#10], Statistics(sizeInBytes=8.0 EiB)

Why are the changes needed?

  1. Avoid skew join in some cases.
  2. Hive support this optimization.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Unit test and benchmark test:
Case1:

Before this PR After this PR
image image

Case2:

Before this PR After this PR
image image

@SparkQA
Copy link

SparkQA commented May 26, 2020

Test build #123119 has finished for PR 28642 at commit d657299.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangyum wangyum changed the title [SPARK-31809][SQL] Infer IsNotNull for all children of NullIntolerant expression [SPARK-31809][SQL] Infer IsNotNull for non null intolerant child of null intolerant in join condition Jun 5, 2020
testConstraintsAfterJoin(
testRelation.subquery('left),
testRelation.subquery('right),
testRelation.where(IsNotNull(Coalesce(Seq('a, 'b)))).subquery('left),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hive> EXPLAIN SELECT t1.* FROM t1 JOIN t2 ON coalesce(t1.a, t1.b)=t2.a;
OK
STAGE DEPENDENCIES:
  Stage-4 is a root stage
  Stage-3 depends on stages: Stage-4
  Stage-0 depends on stages: Stage-3

STAGE PLANS:
  Stage: Stage-4
    Map Reduce Local Work
      Alias -> Map Local Tables:
        $hdt$_0:t1
          Fetch Operator
            limit: -1
      Alias -> Map Local Operator Tree:
        $hdt$_0:t1
          TableScan
            alias: t1
            Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
            Filter Operator
              predicate: COALESCE(a,b) is not null (type: boolean)
              Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
              Select Operator
                expressions: a (type: string), b (type: string), c (type: string)
                outputColumnNames: _col0, _col1, _col2
                Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
                HashTable Sink Operator
                  keys:
                    0 COALESCE(_col0,_col1) (type: string)
                    1 _col0 (type: string)

  Stage: Stage-3
    Map Reduce
      Map Operator Tree:
          TableScan
            alias: t2
            Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
            Filter Operator
              predicate: a is not null (type: boolean)
              Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
              Select Operator
                expressions: a (type: string)
                outputColumnNames: _col0
                Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
                Map Join Operator
                  condition map:
                       Inner Join 0 to 1
                  keys:
                    0 COALESCE(_col0,_col1) (type: string)
                    1 _col0 (type: string)
                  outputColumnNames: _col0, _col1, _col2
                  Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
                  File Output Operator
                    compressed: false
                    Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
                    table:
                        input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                        output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                        serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
      Execution mode: vectorized
      Local Work:
        Map Reduce Local Work

  Stage: Stage-0
    Fetch Operator
      limit: -1
      Processor Tree:
        ListSink


test("Should not infer IsNotNull for non null-intolerant child from same table") {
comparePlans(Optimize.execute(testRelation.where(Coalesce(Seq('a, 'b)) === 'c).analyze),
testRelation.where(Coalesce(Seq('a, 'b)) === 'c && IsNotNull('c)).analyze)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hive> EXPLAIN SELECT t1.* FROM t1 WHERE coalesce(t1.a, t1.b)=t1.c;
OK
STAGE DEPENDENCIES:
  Stage-0 is a root stage

STAGE PLANS:
  Stage: Stage-0
    Fetch Operator
      limit: -1
      Processor Tree:
        TableScan
          alias: t1
          Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
          Filter Operator
            predicate: (COALESCE(a,b) = c) (type: boolean)
            Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
            Select Operator
              expressions: a (type: string), b (type: string), c (type: string)
              outputColumnNames: _col0, _col1, _col2
              Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
              ListSink

Time taken: 4.026 seconds, Fetched: 20 row(s)

@apache apache deleted a comment from SparkQA Jun 5, 2020
@wangyum
Copy link
Member Author

wangyum commented Jun 5, 2020

retest this please

@SparkQA
Copy link

SparkQA commented Jun 5, 2020

Test build #123553 has finished for PR 28642 at commit a5f52a8.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 7, 2020

Test build #123607 has finished for PR 28642 at commit 65cd324.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -1039,7 +1039,7 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
val pythonEvals = collect(joinNode.get) {
case p: BatchEvalPythonExec => p
}
assert(pythonEvals.size == 2)
assert(pythonEvals.size == 4)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon I'm not sure if this change can optimize python udf?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I don't think it's more efficient to have BatchEvalPythonExec more. It will require more Python executions which aren't trivial.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I quickly checked:

== Physical Plan ==
*(3) Project [a#225, b#226, c#236, d#237]
+- *(3) BroadcastHashJoin [cast(pythonUDF0#256 as int)], [cast(pythonUDF0#257 as int)], Inner, BuildRight
   :- BatchEvalPython [udf(cast(a#225 as string))], [pythonUDF0#256]
   :  +- *(1) Project [_1#220 AS a#225, _2#221 AS b#226]
   :     +- *(1) Project [_1#220, _2#221]
   :        +- *(1) Filter isnotnull(cast(pythonUDF0#254 as int))
   :           +- BatchEvalPython [udf(cast(_1#220 as string))], [pythonUDF0#254]
   :              +- LocalTableScan [_1#220, _2#221]
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(cast(input[2, string, true] as int) as bigint))), [id=#140]
      +- BatchEvalPython [udf(cast(c#236 as string))], [pythonUDF0#257]
         +- *(2) Project [_1#231 AS c#236, _2#232 AS d#237]
            +- *(2) Project [_1#231, _2#232]
               +- *(2) Filter isnotnull(cast(pythonUDF0#255 as int))
                  +- BatchEvalPython [udf(cast(_1#231 as string))], [pythonUDF0#255]
                     +- LocalTableScan [_1#231, _2#232]

We should probably avoid inferring the is-not-null filter in this case.

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Jun 15, 2020

Test build #124032 has finished for PR 28642 at commit 65cd324.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

retest this please

@HyukjinKwon
Copy link
Member

cc @cloud-fan FYI

@SparkQA
Copy link

SparkQA commented Jun 15, 2020

Test build #124038 has finished for PR 28642 at commit 65cd324.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Sep 24, 2020
@wangyum wangyum closed this Sep 24, 2020
@wangyum wangyum reopened this Aug 10, 2021
@wangyum wangyum removed the Stale label Aug 10, 2021
@SparkQA
Copy link

SparkQA commented Aug 10, 2021

Test build #142271 has finished for PR 28642 at commit 65cd324.

  • This patch fails Python style tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 10, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/46779/

@SparkQA
Copy link

SparkQA commented Aug 10, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/46779/

@wangyum wangyum changed the title [SPARK-31809][SQL] Infer IsNotNull for non null intolerant child of null intolerant in join condition [SPARK-31809][SQL] Infer IsNotNull from join condition Aug 10, 2021
@SparkQA
Copy link

SparkQA commented Aug 10, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/46784/

@SparkQA
Copy link

SparkQA commented Aug 10, 2021

Test build #142276 has finished for PR 28642 at commit b100902.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 11, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/46806/

@SparkQA
Copy link

SparkQA commented Aug 11, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/46806/

@SparkQA
Copy link

SparkQA commented Aug 11, 2021

Test build #142299 has finished for PR 28642 at commit 3643e3f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 16, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/46988/

@SparkQA
Copy link

SparkQA commented Aug 16, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/46988/

@wangyum wangyum changed the title [SPARK-31809][SQL] Infer IsNotNull from join condition [SPARK-31809][SQL] Infer IsNotNull from some special equality join keys Oct 27, 2021
@SparkQA
Copy link

SparkQA commented Oct 27, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49129/

@SparkQA
Copy link

SparkQA commented Oct 27, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49129/

Comment on lines 1220 to 1224
private def resultMayBeNull(e: Expression): Boolean = e match {
case Cast(child, dataType, _, _) => !Cast.canUpCast(child.dataType, dataType)
case _: Coalesce => true
case _ => false
}
Copy link
Member Author

@wangyum wangyum Oct 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan @HyukjinKwon It will not infer all equality join keys. For example:

Infer Will not infer
cast(strCol AS double) = doubleCol upper(strCol) = upperStrCol

@SparkQA
Copy link

SparkQA commented Oct 27, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49130/

@SparkQA
Copy link

SparkQA commented Oct 27, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49130/

@SparkQA
Copy link

SparkQA commented Oct 27, 2021

Test build #144659 has finished for PR 28642 at commit a9eb7de.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 27, 2021

Test build #144661 has finished for PR 28642 at commit 7796e5c.

  • This patch fails from timeout after a configured wait of 500m.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 28, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49149/

@SparkQA
Copy link

SparkQA commented Oct 28, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49149/

@SparkQA
Copy link

SparkQA commented Oct 28, 2021

Test build #144680 has finished for PR 28642 at commit c88566a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 28, 2021

Test build #144703 has finished for PR 28642 at commit 919492e.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 28, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49172/

@SparkQA
Copy link

SparkQA commented Oct 28, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49172/

private def resultMayBeNull(exp: Expression): Boolean = exp match {
case e if !e.nullable => false
case Cast(child: Attribute, dataType, _, _) => !Cast.canUpCast(child.dataType, dataType)
case c: Coalesce if c.children.forall(_.isInstanceOf[Attribute]) => true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we rely on the NullIntolerant interface?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can infer NullIntolerant already. For example:

spark.sql("create table t1 (id string, value int) using parquet")
spark.sql("create table t2 (id int, value int) using parquet")

spark.sql("select * from t1 join t2 on t1.id = t2.id").explain("extended")

== Optimized Logical Plan ==
Join Inner, (cast(id#0 as int) = id#2)
:- Filter isnotnull(id#0)
:  +- Relation default.t1[id#0,value#1] parquet
+- Filter isnotnull(id#2)
   +- Relation default.t2[id#2,value#3] parquet

Cast is NullIntolerant. We can infer IsNotNull(t1.id) already. But I also want to Infer isnotnull(cast(t1.id as int)) because t1.id may contains many strings that can not be casted to int.

@@ -1215,6 +1215,15 @@ object InferFiltersFromConstraints extends Rule[LogicalPlan]
}
}

// Whether the result of this expression may be null. For example: CAST(strCol AS double)
// We will infer an IsNotNull expression for this expression to avoid skew join.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it better to infer IsNotNull(col) instead of IsNotNull(CAST(col AS other_type))?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can infer IsNotNull(col) already. For example:

spark.sql("create table t1 (id string, value int) using parquet")
spark.sql("create table t2 (id int, value int) using parquet")

spark.sql("select * from t1 join t2 on t1.id = t2.id").explain("extended")

Before this pr:

== Optimized Logical Plan ==
Join Inner, (cast(id#0 as int) = id#2)
:- Filter isnotnull(id#0)
:  +- Relation default.t1[id#0,value#1] parquet
+- Filter isnotnull(id#2)
   +- Relation default.t2[id#2,value#3] parquet

After this pr:

== Optimized Logical Plan ==
Join Inner, (cast(id#0 as int) = id#2)
:- Filter (isnotnull(id#0) AND isnotnull(cast(id#0 as int)))
:  +- Relation default.t1[id#0,value#1] parquet
+- Filter isnotnull(id#2)
   +- Relation default.t2[id#2,value#3] parquet

Infer isnotnull(cast(t1.id as int)) may filter out many strings that can not be casted to int.

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants